package ssm

import com.sun.prism.PixelFormat.DataType
import org.apache.parquet.format.IntType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import java.util.Properties

object NameCount {
  def main(args:Array[String]): Unit ={
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "z9633352")
    prop.put("driver", "com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/python_db"
    while(true){
      Thread.sleep(10000)
      println("开始更新课程点击量")
      lessCount(prop,url)
    }
  }
  def lessCount(prop:Properties,url:String): Unit = {
    val spark = SparkSession.builder().appName("NameCount").master("local").getOrCreate()
    val sc = spark.sparkContext
/*
整体流程的数据类型为Mysql->
DataFrame->{ RDD[Row]->RDD[String] }
->replace归并课程类型
->RDD[String,Int]统计点击量
->RDD[Row(String,Integer)]
->DataFrame->Mysql
 */

    import spark.implicits._
    val dataFrame = spark.read.jdbc(url, "warehouse", prop).select("name")

//    DataFrame->{ RDD[Row]->RDD[String] }
    val dataRDD = dataFrame.rdd.map(x=>x.mkString(""))
//    dataRDD.foreach(println)

//    ->replace归并课程类型
    val word = dataRDD
      .map(_.replaceAll("10 HoursCrash","Hadoop"))//Hadoop的点击量太大目前，缩小比例
      .map(_.replaceAll("Spark Streaming","Spark"))
      .map(_.replaceAll("Hive","Hadoop"))
      .map(_.replaceAll("Spark SQL","Spark"))
      .map(_.replaceAll("Blink","Flink"))
    //word.foreach(println)

//    ->RDD[String,Int]统计点击量
    val wordCount = word.map((_,1))
      .reduceByKey(_+_)
      .sortBy(_._2,false)
    //wordCount.foreach(println)

//    ->RDD[Row(String,Integer)]
    val dF = wordCount.map(x=>{Row(x._1,x._2)})
    //dF.foreach(println)

//    ->DataFrame->Mysql
    val schema = StructType(Array(
      StructField("lesson",StringType),
      StructField("counts",IntegerType)
    ))
    val orgDF = spark.createDataFrame(dF, schema)
    orgDF.show()

    println("开始写入数据库")
    orgDF.write.mode("overwrite").jdbc(url,"lesson_counts",prop)
    println("完成写入数据库,因为报错会中断进程")
    sc.stop()
  }
}
